[jira] [Created] (FLINK-5168) Scaladoc annotation link use [[]] instead of {@link}

2016-11-26 Thread shijinkui (JIRA)
shijinkui created FLINK-5168:


 Summary: Scaladoc annotation link use [[]] instead of {@link}
 Key: FLINK-5168
 URL: https://issues.apache.org/jira/browse/FLINK-5168
 Project: Flink
  Issue Type: Improvement
  Components: Scala API
Reporter: shijinkui


{@link StreamExecutionEnvironment#readFile(FileInputFormat,
  *  String, FileProcessingMode, long)}

==>

[[StreamExecutionEnvironment#readFile(FileInputFormat, String, 
FileProcessingMode, long)]]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2875: [FLINK-5168] Scaladoc annotation link use [[]] ins...

2016-11-26 Thread shijinkui
GitHub user shijinkui opened a pull request:

https://github.com/apache/flink/pull/2875

[FLINK-5168] Scaladoc annotation link use [[]] instead of {@link}

`{@link StreamExecutionEnvironment#readFile(FileInputFormat, * String, 
FileProcessingMode, long)}`

==>

`[StreamExecutionEnvironment#readFile(FileInputFormat, String, 
FileProcessingMode, long)]`

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-5168] 
Scaladoc annotation link use [[]] instead of {@link}")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shijinkui/flink FLINK-5168

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2875.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2875


commit c5deec42aa40efd788fa961d2979447f606a3cab
Author: shijinkui 
Date:   2016-11-26T08:14:13Z

[FLINK-5168] Scaladoc annotation link use [[]] instead of {@link}




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5168) Scaladoc annotation link use [[]] instead of {@link}

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697531#comment-15697531
 ] 

ASF GitHub Bot commented on FLINK-5168:
---

GitHub user shijinkui opened a pull request:

https://github.com/apache/flink/pull/2875

[FLINK-5168] Scaladoc annotation link use [[]] instead of {@link}

`{@link StreamExecutionEnvironment#readFile(FileInputFormat, * String, 
FileProcessingMode, long)}`

==>

`[StreamExecutionEnvironment#readFile(FileInputFormat, String, 
FileProcessingMode, long)]`

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-5168] 
Scaladoc annotation link use [[]] instead of {@link}")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shijinkui/flink FLINK-5168

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2875.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2875


commit c5deec42aa40efd788fa961d2979447f606a3cab
Author: shijinkui 
Date:   2016-11-26T08:14:13Z

[FLINK-5168] Scaladoc annotation link use [[]] instead of {@link}




> Scaladoc annotation link use [[]] instead of {@link}
> 
>
> Key: FLINK-5168
> URL: https://issues.apache.org/jira/browse/FLINK-5168
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Reporter: shijinkui
>
> {@link StreamExecutionEnvironment#readFile(FileInputFormat,
>   *  String, FileProcessingMode, long)}
> ==>
> [[StreamExecutionEnvironment#readFile(FileInputFormat, String, 
> FileProcessingMode, long)]]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5166) TextInputFormatTest.testNestedFileRead

2016-11-26 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui updated FLINK-5166:
-
Description: 
mvn clean package -P \\!scala-2.11,scala-2.11  -U

Failed tests:
  TextInputFormatTest.testNestedFileRead:140 Test erroneous

Tests run: 846, Failures: 1, Errors: 0, Skipped: 0



  was:
mvn clean package -P \!scala-2.11,scala-2.11  -U

Failed tests:
  TextInputFormatTest.testNestedFileRead:140 Test erroneous

Tests run: 846, Failures: 1, Errors: 0, Skipped: 0




> TextInputFormatTest.testNestedFileRead
> --
>
> Key: FLINK-5166
> URL: https://issues.apache.org/jira/browse/FLINK-5166
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: shijinkui
>
> mvn clean package -P \\!scala-2.11,scala-2.11  -U
> Failed tests:
>   TextInputFormatTest.testNestedFileRead:140 Test erroneous
> Tests run: 846, Failures: 1, Errors: 0, Skipped: 0



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5166) TextInputFormatTest.testNestedFileRead

2016-11-26 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui updated FLINK-5166:
-
Description: 
`mvn clean package -P \!scala-2.11,scala-2.11  -U`

Failed tests:
  TextInputFormatTest.testNestedFileRead:140 Test erroneous

Tests run: 846, Failures: 1, Errors: 0, Skipped: 0



  was:
mvn clean package -P \\!scala-2.11,scala-2.11  -U

Failed tests:
  TextInputFormatTest.testNestedFileRead:140 Test erroneous

Tests run: 846, Failures: 1, Errors: 0, Skipped: 0




> TextInputFormatTest.testNestedFileRead
> --
>
> Key: FLINK-5166
> URL: https://issues.apache.org/jira/browse/FLINK-5166
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: shijinkui
>
> `mvn clean package -P \!scala-2.11,scala-2.11  -U`
> Failed tests:
>   TextInputFormatTest.testNestedFileRead:140 Test erroneous
> Tests run: 846, Failures: 1, Errors: 0, Skipped: 0



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2874: [FLINK-5167] StreamExecutionEnvironment set function retu...

2016-11-26 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2874
  
The PR changes several `@Public` and `@PublicEvolving` methods. 

We do not touch @Public interfaces for 1.x releases and also try to avoid 
to modify `@PublicEvolving` interface except for very good reasons. We have a 
Maven plugin that prevents changes that break the binary compatibility of 
methods and classes annotated with `@Public`. 
By building the code before opening a pull request, you can early detect 
such issues.

I don't think that we can and should merge this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5167) StreamExecutionEnvironment's set function return `this` instead of void

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697575#comment-15697575
 ] 

ASF GitHub Bot commented on FLINK-5167:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2874
  
The PR changes several `@Public` and `@PublicEvolving` methods. 

We do not touch @Public interfaces for 1.x releases and also try to avoid 
to modify `@PublicEvolving` interface except for very good reasons. We have a 
Maven plugin that prevents changes that break the binary compatibility of 
methods and classes annotated with `@Public`. 
By building the code before opening a pull request, you can early detect 
such issues.

I don't think that we can and should merge this PR.


> StreamExecutionEnvironment's set function return `this` instead of void
> ---
>
> Key: FLINK-5167
> URL: https://issues.apache.org/jira/browse/FLINK-5167
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: shijinkui
>
> for example :
> public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
>   config.setNumberOfExecutionRetries(numberOfExecutionRetries);
> }
> change to:
> public StreamExecutionEnvironment setNumberOfExecutionRetries(int 
> numberOfExecutionRetries) {
>   config.setNumberOfExecutionRetries(numberOfExecutionRetries);
>   return this;
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5166) TextInputFormatTest.testNestedFileRead

2016-11-26 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697607#comment-15697607
 ] 

Chesnay Schepler commented on FLINK-5166:
-

can you include the exception that you got?

> TextInputFormatTest.testNestedFileRead
> --
>
> Key: FLINK-5166
> URL: https://issues.apache.org/jira/browse/FLINK-5166
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, Tests
>Reporter: shijinkui
>
> `mvn clean package -P \!scala-2.11,scala-2.11  -U`
> Failed tests:
>   TextInputFormatTest.testNestedFileRead:140 Test erroneous
> Tests run: 846, Failures: 1, Errors: 0, Skipped: 0



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5166) TextInputFormatTest.testNestedFileRead

2016-11-26 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-5166:

Component/s: (was: Build System)
 Tests
 Batch Connectors and Input/Output Formats

> TextInputFormatTest.testNestedFileRead
> --
>
> Key: FLINK-5166
> URL: https://issues.apache.org/jira/browse/FLINK-5166
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, Tests
>Reporter: shijinkui
>
> `mvn clean package -P \!scala-2.11,scala-2.11  -U`
> Failed tests:
>   TextInputFormatTest.testNestedFileRead:140 Test erroneous
> Tests run: 846, Failures: 1, Errors: 0, Skipped: 0



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2874: [FLINK-5167] StreamExecutionEnvironment set function retu...

2016-11-26 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/2874
  
> The PR changes several @Public and @PublicEvolving methods.
hi, @fhueske 
Today when set some config, find some function cant't chained one by one. 
So want to complete all the function needed `return this` conveniently. If 
chained setting config, the setting will be together, avoid setting config 
evrywhere.

It's only better used, not important. You can close this. Failed building 
is noisy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5167) StreamExecutionEnvironment's set function return `this` instead of void

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697631#comment-15697631
 ] 

ASF GitHub Bot commented on FLINK-5167:
---

Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/2874
  
> The PR changes several @Public and @PublicEvolving methods.
hi, @fhueske 
Today when set some config, find some function cant't chained one by one. 
So want to complete all the function needed `return this` conveniently. If 
chained setting config, the setting will be together, avoid setting config 
evrywhere.

It's only better used, not important. You can close this. Failed building 
is noisy.


> StreamExecutionEnvironment's set function return `this` instead of void
> ---
>
> Key: FLINK-5167
> URL: https://issues.apache.org/jira/browse/FLINK-5167
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: shijinkui
>
> for example :
> public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
>   config.setNumberOfExecutionRetries(numberOfExecutionRetries);
> }
> change to:
> public StreamExecutionEnvironment setNumberOfExecutionRetries(int 
> numberOfExecutionRetries) {
>   config.setNumberOfExecutionRetries(numberOfExecutionRetries);
>   return this;
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5166) TextInputFormatTest.testNestedFileRead

2016-11-26 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697644#comment-15697644
 ] 

shijinkui commented on FLINK-5166:
--

Debug, found that:
there four tmp file in the `expectedFiles` list, as there  are two file in 
`paths`, so when execute 
`assertTrue(expectedFiles.get(i).equals(paths.get(i)));`,  get null by the 
index.

tmp file such as: 
flink/flink-java/tmp/first/TextInputFormatTest3439492861909451525.tmp

It's strangely when i delete the tmp files, then run the test, it's ok.

I think we can clear the directory before test. I add two line code in the PR.


> TextInputFormatTest.testNestedFileRead
> --
>
> Key: FLINK-5166
> URL: https://issues.apache.org/jira/browse/FLINK-5166
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, Tests
>Reporter: shijinkui
>
> `mvn clean package -P \!scala-2.11,scala-2.11  -U`
> Failed tests:
>   TextInputFormatTest.testNestedFileRead:140 Test erroneous
> Tests run: 846, Failures: 1, Errors: 0, Skipped: 0



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2876: [FLINK-5166] TextInputFormatTest.testNestedFileRea...

2016-11-26 Thread shijinkui
GitHub user shijinkui opened a pull request:

https://github.com/apache/flink/pull/2876

[FLINK-5166] TextInputFormatTest.testNestedFileRead


- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shijinkui/flink FLINK-5166-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2876.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2876


commit 085b0fb94e20877c2eec3d2facd593045e26fb1b
Author: shijinkui 
Date:   2016-11-26T09:42:17Z

[FLINK-5166] TextInputFormatTest.testNestedFileRead




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5166) TextInputFormatTest.testNestedFileRead

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697655#comment-15697655
 ] 

ASF GitHub Bot commented on FLINK-5166:
---

GitHub user shijinkui opened a pull request:

https://github.com/apache/flink/pull/2876

[FLINK-5166] TextInputFormatTest.testNestedFileRead


- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shijinkui/flink FLINK-5166-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2876.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2876


commit 085b0fb94e20877c2eec3d2facd593045e26fb1b
Author: shijinkui 
Date:   2016-11-26T09:42:17Z

[FLINK-5166] TextInputFormatTest.testNestedFileRead




> TextInputFormatTest.testNestedFileRead
> --
>
> Key: FLINK-5166
> URL: https://issues.apache.org/jira/browse/FLINK-5166
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, Tests
>Reporter: shijinkui
>
> `mvn clean package -P \!scala-2.11,scala-2.11  -U`
> Failed tests:
>   TextInputFormatTest.testNestedFileRead:140 Test erroneous
> Tests run: 846, Failures: 1, Errors: 0, Skipped: 0



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-26 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89246096
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamProjectableTableSourceScan.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.StreamTableEnvironment
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{ProjectableTableSource, 
StreamTableSource}
+import org.apache.flink.streaming.api.datastream.DataStream
+
+import scala.collection.JavaConverters._
+
+class StreamProjectableTableSourceScan(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+input: RelNode,
+projects: util.List[_ <: RexNode],
+projectionRowType: RelDataType,
+table: RelOptTable)
+  extends StreamScan(cluster, traitSet, table, projectionRowType) {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new StreamProjectableTableSourceScan(cluster,
+  traitSet,
+  input,
+  projects,
+  projectionRowType,
+  getTable)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
--- End diff --

We would like not to override `computeSelfCost` for DataStreamRel.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-26 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671736
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala
 ---
@@ -131,6 +101,33 @@ class TableSourceITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testCsvTableSourceWithProjection(): Unit = {
+
+val csvTable = CommonTestData.getCsvTableSource
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+tEnv.registerTableSource("csvTable", csvTable)
+
+val results = tEnv
+  .scan("csvTable")
+  .select('last, 'id, 'score)
+  .sortPartition(0, Order.ASCENDING)
--- End diff --

Do we have to `sortPartition` here? It seems that it is needless.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-26 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89670471
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamProjectableTableSourceScanRule.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.datastream
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, 
RelOptRuleOperand, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalProject
+import 
org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, 
StreamProjectableTableSourceScan}
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{ProjectableTableSource, 
StreamTableSource}
+
+class StreamProjectableTableSourceScanRule(
+  operand: RelOptRuleOperand,
+  description: String) extends RelOptRule(operand, description) {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+// check that table scan supports projections
+val project = call.rel(0).asInstanceOf[LogicalProject]
+val original = project.getInput.asInstanceOf[RelSubset].getOriginal
+
+original match {
+  case scan: TableScan =>
+val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable])
+dataSetTable match {
+  case tst: TableSourceTable =>
+tst.tableSource match {
+  case s: StreamTableSource[_] =>
+s.isInstanceOf[ProjectableTableSource[_]]
+  case _ =>
+false
+}
+  case _ =>
+false
+}
+  case _ =>
+false
+}
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val project = call.rel(0).asInstanceOf[LogicalProject]
+val scan: TableScan = call.rel(1).asInstanceOf[TableScan]
+
+val convInput = RelOptRule.convert(project.getInput, 
DataStreamConvention.INSTANCE)
+val traitSet: RelTraitSet = 
project.getTraitSet.replace(DataStreamConvention.INSTANCE)
+
+val newRel = new StreamProjectableTableSourceScan(
+  scan.getCluster,
+  traitSet,
+  convInput,
+  project.getProjects,
+  project.getRowType,
+  scan.getTable
+)
+call.transformTo(newRel)
+  }
+
+}
+
+object StreamProjectableTableSourceScanRule {
+  val INSTANCE: RelOptRule = new StreamProjectableTableSourceScanRule(
+operand(classOf[LogicalProject], operand(classOf[TableScan], none())),
+"StreamTableSourceProjectRule")
--- End diff --

Please rename the description the same as the class name.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697950#comment-15697950
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89670435
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchProjectableTableSourceScanRule.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, 
RelOptRuleOperand, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalProject
+import 
org.apache.flink.api.table.plan.nodes.dataset.{BatchProjectableTableSourceScan, 
DataSetConvention}
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+
+class BatchProjectableTableSourceScanRule(
+  operand: RelOptRuleOperand,
+  description: String) extends RelOptRule(operand, description) {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+// check that table scan supports projections
+val project: LogicalProject = call.rel(0).asInstanceOf[LogicalProject]
+val original: RelNode = 
project.getInput.asInstanceOf[RelSubset].getOriginal
+
+original match {
+  case scan: TableScan =>
+val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable])
+dataSetTable match {
+  case tst: TableSourceTable =>
+tst.tableSource match {
+  case s: BatchTableSource[_] =>
+s.isInstanceOf[ProjectableTableSource[_]]
+  case _ =>
+false
+}
+  case _ =>
+false
+}
+  case _ =>
+false
+}
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val project = call.rel(0).asInstanceOf[LogicalProject]
+val scan: TableScan = call.rel(1).asInstanceOf[TableScan]
+
+val convInput = RelOptRule.convert(scan, DataSetConvention.INSTANCE)
+val traitSet: RelTraitSet = 
project.getTraitSet.replace(DataSetConvention.INSTANCE)
+
+val newRel = new BatchProjectableTableSourceScan(
+  scan.getCluster,
+  traitSet,
+  convInput,
+  project.getProjects,
+  project.getRowType,
+  scan.getTable
+)
+call.transformTo(newRel)
+  }
+}
+
+object BatchProjectableTableSourceScanRule {
+  val INSTANCE = new BatchProjectableTableSourceScanRule(
+operand(classOf[LogicalProject], operand(classOf[TableScan], none())),
+"BatchTableSourceProjectRule")
--- End diff --

Please rename the description the same as the class name.


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> 

[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-26 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671128
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
 ---
@@ -102,11 +107,43 @@ class CsvTableSource(
 *   Do not use it in Table API programs.
 */
   override def getDataStream(streamExecEnv: StreamExecutionEnvironment): 
DataStream[Row] = {
-streamExecEnv.createInput(createCsvInput(), returnType)
+streamExecEnv.createInput(createCsvInput(), 
projectionReturnTypeWithOrder.getOrElse(returnType))
+  }
+
+  /** Returns a [[TableSource]] with ability to project fields */
+  override def setProjection(fields: Array[Int]): CsvTableSource = {
+val mask = new Array[Boolean](fieldNames.length)
+fields.foreach(mask(_) = true)
+val orderedReturnType = fields.map(fieldTypes(_))
+val indexDiff = fieldNames.length - fields.length
+val order = fields.map(f => f - 
indexDiff).zipWithIndex.sortBy(_._1).map(_._2)
+val source = copy
--- End diff --

I would like use the `newFieldNames` and `newFieldTypes` to create a new 
`CsvTableSource` instead of a same copy, so that we do not need 
`orderedReturnType`. What do you think ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-26 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671556
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchProjectableTableSourceScan.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+
+import scala.collection.JavaConverters._
+
+class BatchProjectableTableSourceScan(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+input: RelNode,
+projects: util.List[_ <: RexNode],
+projectionRowType: RelDataType,
+table: RelOptTable)
+  extends BatchScan(cluster, traits, table, projectionRowType) {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new BatchProjectableTableSourceScan(cluster,
+  traitSet,
+  input,
+  projects,
+  projectionRowType,
+  getTable)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+super.computeSelfCost(planner, mq).multiplyBy(0.8)
+  }
+
+  override def translateToPlan(
+tableEnv: BatchTableEnvironment,
+expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
--- End diff --

Please indent like `BatchTableSourceScan.translateToPlan`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-26 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671544
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamProjectableTableSourceScan.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.StreamTableEnvironment
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{ProjectableTableSource, 
StreamTableSource}
+import org.apache.flink.streaming.api.datastream.DataStream
+
+import scala.collection.JavaConverters._
+
+class StreamProjectableTableSourceScan(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+input: RelNode,
+projects: util.List[_ <: RexNode],
+projectionRowType: RelDataType,
+table: RelOptTable)
+  extends StreamScan(cluster, traitSet, table, projectionRowType) {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new StreamProjectableTableSourceScan(cluster,
+  traitSet,
+  input,
+  projects,
+  projectionRowType,
+  getTable)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+super.computeSelfCost(planner, mq).multiplyBy(0.8)
+  }
+
+  override def translateToPlan(
+tableEnv: StreamTableEnvironment,
+expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
--- End diff --

Please indent like `BatchTableSourceScan.translateToPlan`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697938#comment-15697938
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671736
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala
 ---
@@ -131,6 +101,33 @@ class TableSourceITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testCsvTableSourceWithProjection(): Unit = {
+
+val csvTable = CommonTestData.getCsvTableSource
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+tEnv.registerTableSource("csvTable", csvTable)
+
+val results = tEnv
+  .scan("csvTable")
+  .select('last, 'id, 'score)
+  .sortPartition(0, Order.ASCENDING)
--- End diff --

Do we have to `sortPartition` here? It seems that it is needless.


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-26 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671180
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamProjectableTableSourceScanRule.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.datastream
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, 
RelOptRuleOperand, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalProject
+import 
org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, 
StreamProjectableTableSourceScan}
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{ProjectableTableSource, 
StreamTableSource}
+
+class StreamProjectableTableSourceScanRule(
+  operand: RelOptRuleOperand,
+  description: String) extends RelOptRule(operand, description) {
--- End diff --

Please indent this like `BatchTableSourceScan`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697948#comment-15697948
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89246193
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamProjectableTableSourceScan.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.StreamTableEnvironment
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{ProjectableTableSource, 
StreamTableSource}
+import org.apache.flink.streaming.api.datastream.DataStream
+
+import scala.collection.JavaConverters._
+
+class StreamProjectableTableSourceScan(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+input: RelNode,
+projects: util.List[_ <: RexNode],
+projectionRowType: RelDataType,
+table: RelOptTable)
+  extends StreamScan(cluster, traitSet, table, projectionRowType) {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new StreamProjectableTableSourceScan(cluster,
+  traitSet,
+  input,
+  projects,
+  projectionRowType,
+  getTable)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+super.computeSelfCost(planner, mq).multiplyBy(0.8)
+  }
+
+  override def translateToPlan(
+tableEnv: StreamTableEnvironment,
+expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+
+val tableSourceTable = getTable.unwrap(classOf[TableSourceTable])
+val projectableSource = 
tableSourceTable.tableSource.asInstanceOf[ProjectableTableSource[_]]
+
+val indexes = 
projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray
+
+val tableSource = 
projectableSource.setProjection(indexes).asInstanceOf[StreamTableSource[_]]
+
+val config = tableEnv.getConfig
+val inputDataSet = 
tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
--- End diff --

`inputDataSet` => `inputDataStream`


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697953#comment-15697953
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671128
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
 ---
@@ -102,11 +107,43 @@ class CsvTableSource(
 *   Do not use it in Table API programs.
 */
   override def getDataStream(streamExecEnv: StreamExecutionEnvironment): 
DataStream[Row] = {
-streamExecEnv.createInput(createCsvInput(), returnType)
+streamExecEnv.createInput(createCsvInput(), 
projectionReturnTypeWithOrder.getOrElse(returnType))
+  }
+
+  /** Returns a [[TableSource]] with ability to project fields */
+  override def setProjection(fields: Array[Int]): CsvTableSource = {
+val mask = new Array[Boolean](fieldNames.length)
+fields.foreach(mask(_) = true)
+val orderedReturnType = fields.map(fieldTypes(_))
+val indexDiff = fieldNames.length - fields.length
+val order = fields.map(f => f - 
indexDiff).zipWithIndex.sortBy(_._1).map(_._2)
+val source = copy
--- End diff --

I would like use the `newFieldNames` and `newFieldTypes` to create a new 
`CsvTableSource` instead of a same copy, so that we do not need 
`orderedReturnType`. What do you think ? 


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-26 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89670435
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchProjectableTableSourceScanRule.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, 
RelOptRuleOperand, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalProject
+import 
org.apache.flink.api.table.plan.nodes.dataset.{BatchProjectableTableSourceScan, 
DataSetConvention}
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+
+class BatchProjectableTableSourceScanRule(
+  operand: RelOptRuleOperand,
+  description: String) extends RelOptRule(operand, description) {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+// check that table scan supports projections
+val project: LogicalProject = call.rel(0).asInstanceOf[LogicalProject]
+val original: RelNode = 
project.getInput.asInstanceOf[RelSubset].getOriginal
+
+original match {
+  case scan: TableScan =>
+val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable])
+dataSetTable match {
+  case tst: TableSourceTable =>
+tst.tableSource match {
+  case s: BatchTableSource[_] =>
+s.isInstanceOf[ProjectableTableSource[_]]
+  case _ =>
+false
+}
+  case _ =>
+false
+}
+  case _ =>
+false
+}
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val project = call.rel(0).asInstanceOf[LogicalProject]
+val scan: TableScan = call.rel(1).asInstanceOf[TableScan]
+
+val convInput = RelOptRule.convert(scan, DataSetConvention.INSTANCE)
+val traitSet: RelTraitSet = 
project.getTraitSet.replace(DataSetConvention.INSTANCE)
+
+val newRel = new BatchProjectableTableSourceScan(
+  scan.getCluster,
+  traitSet,
+  convInput,
+  project.getProjects,
+  project.getRowType,
+  scan.getTable
+)
+call.transformTo(newRel)
+  }
+}
+
+object BatchProjectableTableSourceScanRule {
+  val INSTANCE = new BatchProjectableTableSourceScanRule(
+operand(classOf[LogicalProject], operand(classOf[TableScan], none())),
+"BatchTableSourceProjectRule")
--- End diff --

Please rename the description the same as the class name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-26 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89246193
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamProjectableTableSourceScan.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.StreamTableEnvironment
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{ProjectableTableSource, 
StreamTableSource}
+import org.apache.flink.streaming.api.datastream.DataStream
+
+import scala.collection.JavaConverters._
+
+class StreamProjectableTableSourceScan(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+input: RelNode,
+projects: util.List[_ <: RexNode],
+projectionRowType: RelDataType,
+table: RelOptTable)
+  extends StreamScan(cluster, traitSet, table, projectionRowType) {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new StreamProjectableTableSourceScan(cluster,
+  traitSet,
+  input,
+  projects,
+  projectionRowType,
+  getTable)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+super.computeSelfCost(planner, mq).multiplyBy(0.8)
+  }
+
+  override def translateToPlan(
+tableEnv: StreamTableEnvironment,
+expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+
+val tableSourceTable = getTable.unwrap(classOf[TableSourceTable])
+val projectableSource = 
tableSourceTable.tableSource.asInstanceOf[ProjectableTableSource[_]]
+
+val indexes = 
projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray
+
+val tableSource = 
projectableSource.setProjection(indexes).asInstanceOf[StreamTableSource[_]]
+
+val config = tableEnv.getConfig
+val inputDataSet = 
tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
--- End diff --

`inputDataSet` => `inputDataStream`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697955#comment-15697955
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671437
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchProjectableTableSourceScanRule.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, 
RelOptRuleOperand, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalProject
+import 
org.apache.flink.api.table.plan.nodes.dataset.{BatchProjectableTableSourceScan, 
DataSetConvention}
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+
+class BatchProjectableTableSourceScanRule(
+  operand: RelOptRuleOperand,
+  description: String) extends RelOptRule(operand, description) {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+// check that table scan supports projections
+val project: LogicalProject = call.rel(0).asInstanceOf[LogicalProject]
+val original: RelNode = 
project.getInput.asInstanceOf[RelSubset].getOriginal
+
+original match {
+  case scan: TableScan =>
+val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable])
+dataSetTable match {
+  case tst: TableSourceTable =>
+tst.tableSource match {
+  case s: BatchTableSource[_] =>
+s.isInstanceOf[ProjectableTableSource[_]]
+  case _ =>
+false
+}
+  case _ =>
+false
+}
+  case _ =>
+false
+}
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val project = call.rel(0).asInstanceOf[LogicalProject]
+val scan: TableScan = call.rel(1).asInstanceOf[TableScan]
+
+val convInput = RelOptRule.convert(scan, DataSetConvention.INSTANCE)
+val traitSet: RelTraitSet = 
project.getTraitSet.replace(DataSetConvention.INSTANCE)
+
+val newRel = new BatchProjectableTableSourceScan(
+  scan.getCluster,
+  traitSet,
+  convInput,
+  project.getProjects,
+  project.getRowType,
+  scan.getTable
+)
--- End diff --

I would like indent the `)` to the previous line.


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697943#comment-15697943
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671175
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchProjectableTableSourceScanRule.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, 
RelOptRuleOperand, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalProject
+import 
org.apache.flink.api.table.plan.nodes.dataset.{BatchProjectableTableSourceScan, 
DataSetConvention}
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+
+class BatchProjectableTableSourceScanRule(
+  operand: RelOptRuleOperand,
+  description: String) extends RelOptRule(operand, description) {
--- End diff --

Please indent this like `BatchTableSourceScan`.


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697939#comment-15697939
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671603
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchProjectableTableSourceScanRule.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, 
RelOptRuleOperand, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalProject
+import 
org.apache.flink.api.table.plan.nodes.dataset.{BatchProjectableTableSourceScan, 
DataSetConvention}
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+
+class BatchProjectableTableSourceScanRule(
+  operand: RelOptRuleOperand,
+  description: String) extends RelOptRule(operand, description) {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+// check that table scan supports projections
+val project: LogicalProject = call.rel(0).asInstanceOf[LogicalProject]
+val original: RelNode = 
project.getInput.asInstanceOf[RelSubset].getOriginal
+
+original match {
+  case scan: TableScan =>
--- End diff --

The rule has make sure the first RelNode is `LogicalProject` and the second 
is `TableScan`, so I think we do not need to check whether `original` is a 
`TableScan` again here.


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697945#comment-15697945
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89246096
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamProjectableTableSourceScan.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.StreamTableEnvironment
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{ProjectableTableSource, 
StreamTableSource}
+import org.apache.flink.streaming.api.datastream.DataStream
+
+import scala.collection.JavaConverters._
+
+class StreamProjectableTableSourceScan(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+input: RelNode,
+projects: util.List[_ <: RexNode],
+projectionRowType: RelDataType,
+table: RelOptTable)
+  extends StreamScan(cluster, traitSet, table, projectionRowType) {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new StreamProjectableTableSourceScan(cluster,
+  traitSet,
+  input,
+  projects,
+  projectionRowType,
+  getTable)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
--- End diff --

We would like not to override `computeSelfCost` for DataStreamRel.


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697947#comment-15697947
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671556
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchProjectableTableSourceScan.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+
+import scala.collection.JavaConverters._
+
+class BatchProjectableTableSourceScan(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+input: RelNode,
+projects: util.List[_ <: RexNode],
+projectionRowType: RelDataType,
+table: RelOptTable)
+  extends BatchScan(cluster, traits, table, projectionRowType) {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new BatchProjectableTableSourceScan(cluster,
+  traitSet,
+  input,
+  projects,
+  projectionRowType,
+  getTable)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+super.computeSelfCost(planner, mq).multiplyBy(0.8)
+  }
+
+  override def translateToPlan(
+tableEnv: BatchTableEnvironment,
+expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
--- End diff --

Please indent like `BatchTableSourceScan.translateToPlan`


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697952#comment-15697952
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671523
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchProjectableTableSourceScan.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+
+import scala.collection.JavaConverters._
+
+class BatchProjectableTableSourceScan(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+input: RelNode,
+projects: util.List[_ <: RexNode],
+projectionRowType: RelDataType,
+table: RelOptTable)
+  extends BatchScan(cluster, traits, table, projectionRowType) {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new BatchProjectableTableSourceScan(cluster,
+  traitSet,
+  input,
+  projects,
+  projectionRowType,
+  getTable)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+super.computeSelfCost(planner, mq).multiplyBy(0.8)
+  }
+
+  override def translateToPlan(
+tableEnv: BatchTableEnvironment,
+expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+val tableSourceTable = getTable.unwrap(classOf[TableSourceTable])
+val projectableSource = 
tableSourceTable.tableSource.asInstanceOf[ProjectableTableSource[_]]
+
+val indexes = 
projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray
--- End diff --

It will be error-prone, not all projects is `RexInputRef` (e.g. we 
`select(a, floor(b), c * 2, 1)`, only `a` is a `RexLiteral`) a exception will 
be thrown here.

Maybe we can add a check to the rule to make sure all the projects is 
`RexInputRef`. 


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-26 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671175
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchProjectableTableSourceScanRule.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, 
RelOptRuleOperand, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalProject
+import 
org.apache.flink.api.table.plan.nodes.dataset.{BatchProjectableTableSourceScan, 
DataSetConvention}
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+
+class BatchProjectableTableSourceScanRule(
+  operand: RelOptRuleOperand,
+  description: String) extends RelOptRule(operand, description) {
--- End diff --

Please indent this like `BatchTableSourceScan`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-26 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671523
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchProjectableTableSourceScan.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+
+import scala.collection.JavaConverters._
+
+class BatchProjectableTableSourceScan(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+input: RelNode,
+projects: util.List[_ <: RexNode],
+projectionRowType: RelDataType,
+table: RelOptTable)
+  extends BatchScan(cluster, traits, table, projectionRowType) {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new BatchProjectableTableSourceScan(cluster,
+  traitSet,
+  input,
+  projects,
+  projectionRowType,
+  getTable)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+super.computeSelfCost(planner, mq).multiplyBy(0.8)
+  }
+
+  override def translateToPlan(
+tableEnv: BatchTableEnvironment,
+expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+val tableSourceTable = getTable.unwrap(classOf[TableSourceTable])
+val projectableSource = 
tableSourceTable.tableSource.asInstanceOf[ProjectableTableSource[_]]
+
+val indexes = 
projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray
--- End diff --

It will be error-prone, not all projects is `RexInputRef` (e.g. we 
`select(a, floor(b), c * 2, 1)`, only `a` is a `RexLiteral`) a exception will 
be thrown here.

Maybe we can add a check to the rule to make sure all the projects is 
`RexInputRef`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-26 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671437
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchProjectableTableSourceScanRule.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, 
RelOptRuleOperand, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalProject
+import 
org.apache.flink.api.table.plan.nodes.dataset.{BatchProjectableTableSourceScan, 
DataSetConvention}
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+
+class BatchProjectableTableSourceScanRule(
+  operand: RelOptRuleOperand,
+  description: String) extends RelOptRule(operand, description) {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+// check that table scan supports projections
+val project: LogicalProject = call.rel(0).asInstanceOf[LogicalProject]
+val original: RelNode = 
project.getInput.asInstanceOf[RelSubset].getOriginal
+
+original match {
+  case scan: TableScan =>
+val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable])
+dataSetTable match {
+  case tst: TableSourceTable =>
+tst.tableSource match {
+  case s: BatchTableSource[_] =>
+s.isInstanceOf[ProjectableTableSource[_]]
+  case _ =>
+false
+}
+  case _ =>
+false
+}
+  case _ =>
+false
+}
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val project = call.rel(0).asInstanceOf[LogicalProject]
+val scan: TableScan = call.rel(1).asInstanceOf[TableScan]
+
+val convInput = RelOptRule.convert(scan, DataSetConvention.INSTANCE)
+val traitSet: RelTraitSet = 
project.getTraitSet.replace(DataSetConvention.INSTANCE)
+
+val newRel = new BatchProjectableTableSourceScan(
+  scan.getCluster,
+  traitSet,
+  convInput,
+  project.getProjects,
+  project.getRowType,
+  scan.getTable
+)
--- End diff --

I would like indent the `)` to the previous line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697942#comment-15697942
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89670471
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamProjectableTableSourceScanRule.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.datastream
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, 
RelOptRuleOperand, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalProject
+import 
org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, 
StreamProjectableTableSourceScan}
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{ProjectableTableSource, 
StreamTableSource}
+
+class StreamProjectableTableSourceScanRule(
+  operand: RelOptRuleOperand,
+  description: String) extends RelOptRule(operand, description) {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+// check that table scan supports projections
+val project = call.rel(0).asInstanceOf[LogicalProject]
+val original = project.getInput.asInstanceOf[RelSubset].getOriginal
+
+original match {
+  case scan: TableScan =>
+val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable])
+dataSetTable match {
+  case tst: TableSourceTable =>
+tst.tableSource match {
+  case s: StreamTableSource[_] =>
+s.isInstanceOf[ProjectableTableSource[_]]
+  case _ =>
+false
+}
+  case _ =>
+false
+}
+  case _ =>
+false
+}
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val project = call.rel(0).asInstanceOf[LogicalProject]
+val scan: TableScan = call.rel(1).asInstanceOf[TableScan]
+
+val convInput = RelOptRule.convert(project.getInput, 
DataStreamConvention.INSTANCE)
+val traitSet: RelTraitSet = 
project.getTraitSet.replace(DataStreamConvention.INSTANCE)
+
+val newRel = new StreamProjectableTableSourceScan(
+  scan.getCluster,
+  traitSet,
+  convInput,
+  project.getProjects,
+  project.getRowType,
+  scan.getTable
+)
+call.transformTo(newRel)
+  }
+
+}
+
+object StreamProjectableTableSourceScanRule {
+  val INSTANCE: RelOptRule = new StreamProjectableTableSourceScanRule(
+operand(classOf[LogicalProject], operand(classOf[TableScan], none())),
+"StreamTableSourceProjectRule")
--- End diff --

Please rename the description the same as the class name.



> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might

[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-26 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671731
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamProjectableTableSourceScanRule.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.datastream
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, 
RelOptRuleOperand, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalProject
+import 
org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, 
StreamProjectableTableSourceScan}
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{ProjectableTableSource, 
StreamTableSource}
+
--- End diff --

Please add a short comment to describe what is this rule used for.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-26 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671462
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchProjectableTableSourceScan.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+
+import scala.collection.JavaConverters._
+
+class BatchProjectableTableSourceScan(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+input: RelNode,
+projects: util.List[_ <: RexNode],
+projectionRowType: RelDataType,
+table: RelOptTable)
+  extends BatchScan(cluster, traits, table, projectionRowType) {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new BatchProjectableTableSourceScan(cluster,
--- End diff --

I would like indent the `cluster,` to the next line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697937#comment-15697937
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671180
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamProjectableTableSourceScanRule.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.datastream
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, 
RelOptRuleOperand, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalProject
+import 
org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, 
StreamProjectableTableSourceScan}
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{ProjectableTableSource, 
StreamTableSource}
+
+class StreamProjectableTableSourceScanRule(
+  operand: RelOptRuleOperand,
+  description: String) extends RelOptRule(operand, description) {
--- End diff --

Please indent this like `BatchTableSourceScan`.


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697941#comment-15697941
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671462
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchProjectableTableSourceScan.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+
+import scala.collection.JavaConverters._
+
+class BatchProjectableTableSourceScan(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+input: RelNode,
+projects: util.List[_ <: RexNode],
+projectionRowType: RelDataType,
+table: RelOptTable)
+  extends BatchScan(cluster, traits, table, projectionRowType) {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new BatchProjectableTableSourceScan(cluster,
--- End diff --

I would like indent the `cluster,` to the next line.


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697944#comment-15697944
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671537
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamProjectableTableSourceScan.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.StreamTableEnvironment
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{ProjectableTableSource, 
StreamTableSource}
+import org.apache.flink.streaming.api.datastream.DataStream
+
+import scala.collection.JavaConverters._
+
+class StreamProjectableTableSourceScan(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+input: RelNode,
+projects: util.List[_ <: RexNode],
+projectionRowType: RelDataType,
+table: RelOptTable)
+  extends StreamScan(cluster, traitSet, table, projectionRowType) {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new StreamProjectableTableSourceScan(cluster,
+  traitSet,
+  input,
+  projects,
+  projectionRowType,
+  getTable)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+super.computeSelfCost(planner, mq).multiplyBy(0.8)
+  }
+
+  override def translateToPlan(
+tableEnv: StreamTableEnvironment,
+expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+
+val tableSourceTable = getTable.unwrap(classOf[TableSourceTable])
+val projectableSource = 
tableSourceTable.tableSource.asInstanceOf[ProjectableTableSource[_]]
+
+val indexes = 
projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray
--- End diff --

The same problem will happen here.


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697946#comment-15697946
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671544
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamProjectableTableSourceScan.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.StreamTableEnvironment
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{ProjectableTableSource, 
StreamTableSource}
+import org.apache.flink.streaming.api.datastream.DataStream
+
+import scala.collection.JavaConverters._
+
+class StreamProjectableTableSourceScan(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+input: RelNode,
+projects: util.List[_ <: RexNode],
+projectionRowType: RelDataType,
+table: RelOptTable)
+  extends StreamScan(cluster, traitSet, table, projectionRowType) {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new StreamProjectableTableSourceScan(cluster,
+  traitSet,
+  input,
+  projects,
+  projectionRowType,
+  getTable)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+super.computeSelfCost(planner, mq).multiplyBy(0.8)
+  }
+
+  override def translateToPlan(
+tableEnv: StreamTableEnvironment,
+expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
--- End diff --

Please indent like `BatchTableSourceScan.translateToPlan`


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697954#comment-15697954
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89670370
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchProjectableTableSourceScanRule.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, 
RelOptRuleOperand, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalProject
+import 
org.apache.flink.api.table.plan.nodes.dataset.{BatchProjectableTableSourceScan, 
DataSetConvention}
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+
--- End diff --

Add a short comment to describe what is this rule used for.


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-26 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89246478
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/ProjectableTableSource.scala
 ---
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.sources
+
+// TODO FLINK-3848
--- End diff --

Could we remove this comment ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-26 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671537
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamProjectableTableSourceScan.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.StreamTableEnvironment
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{ProjectableTableSource, 
StreamTableSource}
+import org.apache.flink.streaming.api.datastream.DataStream
+
+import scala.collection.JavaConverters._
+
+class StreamProjectableTableSourceScan(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+input: RelNode,
+projects: util.List[_ <: RexNode],
+projectionRowType: RelDataType,
+table: RelOptTable)
+  extends StreamScan(cluster, traitSet, table, projectionRowType) {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new StreamProjectableTableSourceScan(cluster,
+  traitSet,
+  input,
+  projects,
+  projectionRowType,
+  getTable)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+super.computeSelfCost(planner, mq).multiplyBy(0.8)
+  }
+
+  override def translateToPlan(
+tableEnv: StreamTableEnvironment,
+expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+
+val tableSourceTable = getTable.unwrap(classOf[TableSourceTable])
+val projectableSource = 
tableSourceTable.tableSource.asInstanceOf[ProjectableTableSource[_]]
+
+val indexes = 
projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray
--- End diff --

The same problem will happen here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-26 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89670370
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchProjectableTableSourceScanRule.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, 
RelOptRuleOperand, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalProject
+import 
org.apache.flink.api.table.plan.nodes.dataset.{BatchProjectableTableSourceScan, 
DataSetConvention}
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+
--- End diff --

Add a short comment to describe what is this rule used for.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-26 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671607
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamProjectableTableSourceScanRule.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.datastream
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, 
RelOptRuleOperand, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalProject
+import 
org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, 
StreamProjectableTableSourceScan}
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{ProjectableTableSource, 
StreamTableSource}
+
+class StreamProjectableTableSourceScanRule(
+  operand: RelOptRuleOperand,
+  description: String) extends RelOptRule(operand, description) {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+// check that table scan supports projections
+val project = call.rel(0).asInstanceOf[LogicalProject]
+val original = project.getInput.asInstanceOf[RelSubset].getOriginal
+
+original match {
+  case scan: TableScan =>
--- End diff --

The rule has make sure the first RelNode is `LogicalProject` and the second 
is `TableScan`, so I think we do not need to check whether `original` is a 
`TableScan` again here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697951#comment-15697951
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671731
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamProjectableTableSourceScanRule.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.datastream
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, 
RelOptRuleOperand, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalProject
+import 
org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, 
StreamProjectableTableSourceScan}
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{ProjectableTableSource, 
StreamTableSource}
+
--- End diff --

Please add a short comment to describe what is this rule used for.


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-26 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671603
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchProjectableTableSourceScanRule.scala
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, 
RelOptRuleOperand, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalProject
+import 
org.apache.flink.api.table.plan.nodes.dataset.{BatchProjectableTableSourceScan, 
DataSetConvention}
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+
+class BatchProjectableTableSourceScanRule(
+  operand: RelOptRuleOperand,
+  description: String) extends RelOptRule(operand, description) {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+// check that table scan supports projections
+val project: LogicalProject = call.rel(0).asInstanceOf[LogicalProject]
+val original: RelNode = 
project.getInput.asInstanceOf[RelSubset].getOriginal
+
+original match {
+  case scan: TableScan =>
--- End diff --

The rule has make sure the first RelNode is `LogicalProject` and the second 
is `TableScan`, so I think we do not need to check whether `original` is a 
`TableScan` again here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697949#comment-15697949
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89246478
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/ProjectableTableSource.scala
 ---
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.sources
+
+// TODO FLINK-3848
--- End diff --

Could we remove this comment ? 


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697940#comment-15697940
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r89671607
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamProjectableTableSourceScanRule.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.datastream
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, 
RelOptRuleOperand, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalProject
+import 
org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, 
StreamProjectableTableSourceScan}
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.{ProjectableTableSource, 
StreamTableSource}
+
+class StreamProjectableTableSourceScanRule(
+  operand: RelOptRuleOperand,
+  description: String) extends RelOptRule(operand, description) {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+// check that table scan supports projections
+val project = call.rel(0).asInstanceOf[LogicalProject]
+val original = project.getInput.asInstanceOf[RelSubset].getOriginal
+
+original match {
+  case scan: TableScan =>
--- End diff --

The rule has make sure the first RelNode is `LogicalProject` and the second 
is `TableScan`, so I think we do not need to check whether `original` is a 
`TableScan` again here.


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4692) Add tumbling group-windows for batch tables

2016-11-26 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15698090#comment-15698090
 ] 

Jark Wu commented on FLINK-4692:


Hi [~fhueske] [~twalthr], I have proposed a [design 
doc|https://docs.google.com/document/d/1lzpnNUmNzn9yuCGf1RSjHuHAWm-O_v2in7y90muXI2o/edit#]
 for this issue and made a prototype. Could you have a look at the design ? Any 
feedbacks are welcome!



> Add tumbling group-windows for batch tables
> ---
>
> Key: FLINK-4692
> URL: https://issues.apache.org/jira/browse/FLINK-4692
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Jark Wu
>
> Add Tumble group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5166) TextInputFormatTest.testNestedFileRead

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15698373#comment-15698373
 ] 

ASF GitHub Bot commented on FLINK-5166:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2876
  
The test should be changed to use more specific folder names including a 
random component. If the directory we are trying to create already exists we 
should most definitely not just delete everything there, but instead try a 
different directory.


> TextInputFormatTest.testNestedFileRead
> --
>
> Key: FLINK-5166
> URL: https://issues.apache.org/jira/browse/FLINK-5166
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, Tests
>Reporter: shijinkui
>
> `mvn clean package -P \!scala-2.11,scala-2.11  -U`
> Failed tests:
>   TextInputFormatTest.testNestedFileRead:140 Test erroneous
> Tests run: 846, Failures: 1, Errors: 0, Skipped: 0



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2876: [FLINK-5166] TextInputFormatTest.testNestedFileRead

2016-11-26 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2876
  
The test should be changed to use more specific folder names including a 
random component. If the directory we are trying to create already exists we 
should most definitely not just delete everything there, but instead try a 
different directory.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-26 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89680317
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -444,6 +445,134 @@ public void run() {
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
+
+   /**
+* This test ensures that when explicitly set to start from earliest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromEarliestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "latest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   readSequence(env, StartupMode.EARLIEST, readProps, parallelism, 
topicName, recordsInEachPartition, 0);
+
+   kafkaOffsetHandler.close();
+   deleteTestTopic(topicName);
+   }
+
+   /**
+* This test ensures that when explicitly set to start from latest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromLatestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   final Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "earliest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   Thread consumeThread = new Thread(new Runnable() {
+   @Override
+   public void run() {
+   try {
+   readSequence(env, StartupMode.LATEST, 
readProps, parallelism, topicName, 30, 50);
+   } catch (Exception e) {
+   throw new RuntimeException(e);
+   }
+   }
+   });
+   consumeThread.start();
+
+   Thread.sleep(5000);
--- End diff --

Actually, the sleep here isn't waiting for the readSequence call to finish. 
I'm waiting a bit to make sure that the consume job has fully started. It won't 
be able to read anything until new latest data is generated afterwards, which 
is done below by `DataGenerators.generateRandomizedIntegerSequence`.

So, what the test is doing is:
1. Write 50 records to each partition.
2. Commit some random offsets.
3. Start a job to read from latest in a separate thread. (should not read 
any of the previous data, offsets also ignored). The `readSequence` is expected 
to read 30 more records from each partition
4. Make sure the job

[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker

2016-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15698724#comment-15698724
 ] 

ASF GitHub Bot commented on FLINK-4280:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89680317
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -444,6 +445,134 @@ public void run() {
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
+
+   /**
+* This test ensures that when explicitly set to start from earliest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromEarliestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "latest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   readSequence(env, StartupMode.EARLIEST, readProps, parallelism, 
topicName, recordsInEachPartition, 0);
+
+   kafkaOffsetHandler.close();
+   deleteTestTopic(topicName);
+   }
+
+   /**
+* This test ensures that when explicitly set to start from latest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromLatestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   final Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "earliest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   Thread consumeThread = new Thread(new Runnable() {
+   @Override
+   public void run() {
+   try {
+   readSequence(env, StartupMode.LATEST, 
readProps, parallelism, topicName, 30, 50);
+   } catch (Exception e) {
+   throw new RuntimeException(e);
+   }
+   }
+   });
+   consumeThread.start();
+
+   Thread.sleep(5000);
--- End diff --

Actually, the sleep here isn't waiting for the readSequence call to finish. 
I'm waiting a bit to make sure that the consume job has fully started. It won't 
be able to read anything until new latest data is generated afterwards, which 
is done below by `DataGenerators.generateRandomizedIntegerSequence`.

So, what the test is doing is:
1. Write 50 records to each partition.
2. 

[jira] [Updated] (FLINK-1707) Add an Affinity Propagation Library Method

2016-11-26 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/FLINK-1707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josep Rubió updated FLINK-1707:
---
Description: 
This issue proposes adding the an implementation of the Affinity Propagation 
algorithm as a Gelly library method and a corresponding example.
The algorithm is described in paper [1] and a description of a vertex-centric 
implementation can be found is [2].

[1]: http://www.psi.toronto.edu/affinitypropagation/FreyDueckScience07.pdf
[2]: http://event.cwi.nl/grades2014/00-ching-slides.pdf

Design doc:
https://docs.google.com/document/d/1QULalzPqMVICi8jRVs3S0n39pell2ZVc7RNemz_SGA4/edit?usp=sharing

Example spreadsheet:
https://docs.google.com/spreadsheets/d/1CurZCBP6dPb1IYQQIgUHVjQdyLxK0JDGZwlSXCzBcvA/edit?usp=sharing

Graph:
https://docs.google.com/drawings/d/1PC3S-6AEt2Gp_TGrSfiWzkTcL7vXhHSxvM6b9HglmtA/edit?usp=sharing

  was:
This issue proposes adding the an implementation of the Affinity Propagation 
algorithm as a Gelly library method and a corresponding example.
The algorithm is described in paper [1] and a description of a vertex-centric 
implementation can be found is [2].

[1]: http://www.psi.toronto.edu/affinitypropagation/FreyDueckScience07.pdf
[2]: http://event.cwi.nl/grades2014/00-ching-slides.pdf

Design doc:
https://docs.google.com/document/d/1QULalzPqMVICi8jRVs3S0n39pell2ZVc7RNemz_SGA4/edit?usp=sharing

Example spreadsheet:
https://docs.google.com/spreadsheets/d/1CurZCBP6dPb1IYQQIgUHVjQdyLxK0JDGZwlSXCzBcvA/edit?usp=sharing


> Add an Affinity Propagation Library Method
> --
>
> Key: FLINK-1707
> URL: https://issues.apache.org/jira/browse/FLINK-1707
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Josep Rubió
>Priority: Minor
>  Labels: requires-design-doc
> Attachments: Binary_Affinity_Propagation_in_Flink_design_doc.pdf
>
>
> This issue proposes adding the an implementation of the Affinity Propagation 
> algorithm as a Gelly library method and a corresponding example.
> The algorithm is described in paper [1] and a description of a vertex-centric 
> implementation can be found is [2].
> [1]: http://www.psi.toronto.edu/affinitypropagation/FreyDueckScience07.pdf
> [2]: http://event.cwi.nl/grades2014/00-ching-slides.pdf
> Design doc:
> https://docs.google.com/document/d/1QULalzPqMVICi8jRVs3S0n39pell2ZVc7RNemz_SGA4/edit?usp=sharing
> Example spreadsheet:
> https://docs.google.com/spreadsheets/d/1CurZCBP6dPb1IYQQIgUHVjQdyLxK0JDGZwlSXCzBcvA/edit?usp=sharing
> Graph:
> https://docs.google.com/drawings/d/1PC3S-6AEt2Gp_TGrSfiWzkTcL7vXhHSxvM6b9HglmtA/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)